Create DataFrame From Rdd

Converting Spark RDD to DataFrame can be done by using
  • toDF()
  • createDataFrame() 
toDF(): Spark provides an implicit function toDF() which would be used to convert RDD, Seq[T], List[T] to DataFrame. In order to use toDF() function, we should import implicits first using import spark.implicits._ .By default, it creates column names as “_1” and “_2” as we have two columns for each row.

**toDF() has another signature which takes arguments for custom column names as shown below.

val rdd = sc.parallelize(Seq
                                    (("first", Array(2.0, 1.0, 2.1, 5.4)),
                                     ("test", Array(1.5, 0.5, 0.9,3.7)),
                                     ("choose", Array(8.0, 2.9, 9.1, 2.5))))
val rdd1 = rdd.toDF("id","value")
rdd1.show

val df=sc.parallelize(1 to 100)
               .map(a=>(s"user$a",a*.123,a))
               .toDF("name","score","user_id")
df.show


By default, the datatype of these columns infers to the type of data. We can change this behavior by supplying schema – where we can specify a column name, data type and nullable for each field/column.In this example, the number column is not nullable and the word column is nullable.

val rdd = sc.parallelize(Seq(("first", Array(2.0, 1.0, 2.1, 5.4)),
                                             ("test", Array(1.5, 0.5, 0.9,3.7)),
                                             ("choose", Array(8.0, 2.9, 9.1, 2.5))))
val rdd1 = rdd.toDF("id","value")
rdd1.printSchema


createDataFrame():Using createDataFrame() from SparkSession is another way to create and it takes rdd object as an argument. and chain with toDF() to specify names to the columns.Creates a DataFrame from an RDD containing Rows using the given schema.

import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}

val rowsRdd: RDD[Row] = sc.parallelize(Seq(
                                                     Row("first", 2.0, 7.0),
                                                     Row("second", 3.5, 2.5),
                                                     Row("third", 7.0, 5.9)))
val schema = new StructType()
                            .add(StructField("id", StringType, true))
                            .add(StructField("val1", DoubleType, true))
                            .add(StructField("val2", DoubleType, true))
val df = spark.createDataFrame(rowsRdd, schema)
df.show()



df.printSchema
 
 
Example
 
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val appName = "Scala Example - List to Spark Data Frame"
val master = "local"

/*Create Spark session with Hive supported.*/
val spark = SparkSession.builder.appName(appName).master(master).getOrCreate()

/* List */
val data = List(Row("Category A", 100, "This is category A"),
                        Row("Category B", 120, "This is category B"),
                        Row("Category C", 150, "This is category C"))

val schema = StructType(List(
                             StructField("Category", StringType, true),
                             StructField("Count", IntegerType, true),
                             StructField("Description", StringType, true)))

/* Convert list to RDD */
val rdd = spark.sparkContext.parallelize(data)

/* Create data frame */
val df = spark.createDataFrame(rdd, schema)
print(df.schema)
df.show()

No comments:

Post a Comment